跳到主要内容

Redis 实战:构建分布式锁

乐观锁的问题

现在来回顾一下之前那个那个 《使用事务构建一个游戏商城》 商品的购物过程。当玩家在市场上购买商品的时候,程序首先需要使用 WATCH 去监视市场以及买家的个入信息散列,在得知买家现有的钱数以及商品的售价之后,程序会验证买家是否有足够的钱来购买指定的商品:

// 开始交易前先对市场以及买家的个入信息进行监控
conn.watch("market:", buyer); // 这个 watch 可以监视多个 key(这里同时监视玩家背包和商城)

如果买家有足够的钱,那么程序会将买家支付的钱转移给卖家,接着将商品添加到买家的包裹里面,并从市场里面移除已被售出的商品

相反地,如果买家没有足够的钱来购买商品,那么程序就会取消事务。在执行购买操作的过程中,如果有其他玩家对市场进行了改动,或者因为记录买家个入信息的散列出现了变化而引发了 WATCH 错误,那么程序将重新执行购买操作。

无论事务是否回滚,Redis 都会去取消执行事务前的 watch 命令,这个过程如图 1 所示。

Redis 参考了多线程中 使用的 CAS(比较与交换,Compare And Swap)去执行的。在数据高并发环境的操作中,我们把这样的一个机制称为乐观锁。

但是,它有个很严重的问题,虽然它不阻塞线程,但是可能会因为冲突过多,导致重试的次数也过多,为了展示锁对于性能扩展的必要性,我们会模拟市场在 3 种不同负载情况下的性能表现,这 3 种情况分别是:

  • 1 个玩家出售商品,另 1 个玩家购买商品;
  • 5 个玩家出售商品,另 1 个玩家购买商品;
  • 以及 5 个玩家出售商品,另外 5 个玩家购买商品。

表 6-1 展示了模拟的结果。

根据表 6-1 的模拟结果显示,随着负载不断增加,系统完成一次交易所需的重试次数从最初的 3 次上升到了 250 次,与此同时,完成一次交易所需的等待时间也从最初的少于 10ms 上升到了 500ms。

这个模拟示例完美地展示了为什么 WATCH、MULTI 和 EXEC 组成的事务并不具有可扩展性,原因在于程序在尝试完成一个事务的时候,可能会因为事务执行失败而反复地进行重试。保证数据的正确性是一件非常重要的事情,但使用 WATCH 命令的做法并不完美。

为了解决这个问题,并以可扩展的方式来处理市场交易,我们将使用锁来保证市场在任一时 刻只能上架或者销售一件商品。

锁设计时需要考虑的问题

  1. 持有锁的进程因为操作时间过长而导致锁被自动释放,但进程本身并不知晓这一点,甚至还可能会错误地释放掉了其他进程持有的锁。

  2. 一个持有锁并打算执行长时间操作的进程已经崩溃,但其他想要获取锁的进程不知道哪个进程持有着锁,也无法检测出持有锁的进程已经崩溃,只能白白地浪费时间等待锁被释放。

  3. 在一个进程持有的锁过期之后,其他多个进程同时尝试去获取锁,并且都获得了锁。

上面提到的第一种情况和第三种情况同时出现,导致有多个进程获得了锁,而每个进程都以为自己是唯一一个获得锁的进程。

因为 Redis 在最新的硬件上可以每秒执行 100000 个操作,而在高端的硬件上甚至可以每秒执行将近 225 000 个操作,所以尽管上面提到的问题出现的几率只有万分之一,但这些问题在高负载的情况下还是有可能会出现,因此,让锁正确地运作起来仍然是一件相当重要的事情。

补充知识:Setnx 命令

Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值,但是这个值已经存在时,会覆盖失败

redis Setnx 命令基本语法如下:

redis 127.0.0.1:6379> SETNX KEY_NAME VALUE

返回值

  • 设置成功,返回 1
  • 设置失败,返回 0

实例

redis> EXISTS job                # job 不存在
(integer) 0

redis> SETNX job "programmer" # job 设置成功
(integer) 1

redis> SETNX job "code-farmer" # 尝试覆盖 job ,失败
(integer) 0

redis> GET job # 没有被覆盖
"programmer"

构建一个基本的锁

获取锁

为了对数据进行排他性访问,程序首先要做的就是获取锁。SETNX 命令天生就适合用来实现锁的获取功能,这个命令只会在键不存在的情况下为键设置值,而锁要做的就是将一个随机生成的 128 位 UUID 设置为键的值,并使用这个值来防止锁被其他进程取得。

如果程序在尝试获取锁的时候失败,那么它将不断地进行重试(这里就不是消费 Redis 的性能,而是应用的性能了),直到成功地取得锁或者超过给定的时限为止。

/**
* 尝试获取锁
* @param conn Redis 连接
* @param lockName 要锁的对象名称
* @param acquireTimeout 等待获取锁的时间
* @return 返回 null 表示获取锁失败
*/
public String acquireLock(Jedis conn, String lockName, long acquireTimeout) {
// 生成一个 UUID
String identifier = UUID.randomUUID().toString();
long end = System.currentTimeMillis() + acquireTimeout;
while (System.currentTimeMillis() < end) {
// 尝试获取锁,只有获取成功才会返回 1
if (conn.setnx("lock:" + lockName, identifier) == 1) {
return identifier;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
return null;
}

在实现了锁之后,我们就可以使用锁来代替针对市场的 WATCH 操作了。这里使用锁重新实现的商品购买操作:程序首先对市场进行加锁,接着检查商品的价格,并在确保买家有足够的钱来购买商品之后,对钱和商品进行相应的转移。

释放锁

当操作执行完毕之后,程序就会释放锁。所以还需要编写一个释放锁的操作

/**
* 释放锁
*
* @param conn Redis 连接
* @param lockName 要释放锁的对象名称
* @param identifier 标识符(之前生成的 UUID)
* @return 是否释放锁
*/
public boolean releaseLock(Jedis conn, String lockName, String identifier) {
String lockKey = "lock:" + lockName;
while (true) {
// 这里还是使用乐观锁来释放锁
conn.watch(lockKey);
if (identifier.equals(conn.get(lockKey))) { // 检查进程是否仍然持有锁
Transaction trans = conn.multi();
trans.del(lockKey); // 释放锁
List<Object> results = trans.exec();
// 如果为空则说明其它客户修改了锁,需要重试
if (results == null) {
continue;
}
// 因为已经删除了这个 key 了,所以也无需 unwatch
return true;
}
conn.unwatch();
break;
}
return false;
}

需要注意的一点是,对于目前的锁实现来说,releaseLock() 函数包含的无限循环只会在极少数情况下用到,这个函数之所以包含这个无限循环,主要是因为之后介绍的锁实现会支持超时限制特性,而如果用户不小心地混合使用了两个版本的锁,就可能会引起解锁事务失败,并导致上锁时间被不必要地延长。尽管这种情况并不常见,但为了保证解锁操作在各种情况下都能够正确地执行,我们还是选择在一开始就把这个无限循环添加到 releaseLock() 函数里面。

重写购物方法

然后使用这个锁重写之前那个购物的方法

/**
* 购买商品
*
* @param conn Redis 连接
* @param buyerId 购买者 ID
* @param itemId 商品 ID
* @param sellerId 卖方 ID
* @param lprice 交易价格
* @return 是否购买成功
*/
public boolean purchaseItemWithLock(Jedis conn, String buyerId, String itemId, String sellerId, double lprice) {
String buyer = "users:" + buyerId;
String seller = "users:" + sellerId;
String item = itemId + '.' + sellerId;
String inventory = "inventory:" + buyerId;
// 这里令这个锁的名称为 "market"
String locked = acquireLock(conn, "market", 10000);
if (locked == null) {
throw new RuntimeException("Couldn't get the lock");
}
try {
// 检查买家想要购买的商品的价格是否出现变化,以及买家是否有足够的钱来购买这件商品
double price = Optional.ofNullable(conn.zscore("market:", item)).orElseThrow(() -> new RuntimeException("当前商品已经被买走了"));
// 取得买方当前的现金
double funds = Double.parseDouble(conn.hget(buyer, "funds"));
if (price != lprice || price > funds) {
return false;
}
// 上面的判断都没有问题则开启事务,将买方的钱转给卖方
Pipeline pipelined = conn.pipelined();
pipelined.hincrBy(seller, "funds", (int) price);
pipelined.hincrBy(buyer, "funds", (int) -price);
// 将被购商品放入买家背包里面
pipelined.sadd(inventory, itemId);
// 从市场移除这个 item
pipelined.zrem("market:", item);
pipelined.sync();
} finally {
// 释放锁
releaseLock(conn, "market", locked);
}
return true;
}

在使用锁代替 WATCH 重新实现商品购买操作之后,我们可以再次进行之前的商品买卖模拟操作:表 6-2 中的单数行展示了 WATCH 实现的模拟结果,而表中的复数行则展示了在与前一行条件相同的情况下,锁实现的模拟结果。

与之前的 WATCH 实现相比,锁实现的上架商品数量虽然有所减少,但是在买入商品时却不需要进行重试,并且上架商品数量和买入商品数量之间的比率,也跟卖家数量和买家数量之间的比率接近。目前来说,不同上架和买入线程之间的竞争限制了商品买卖操作性能的进一步提升,而接下来介绍的细粒度锁将解决这个问题。

这一节的测试代码

https://gist.github.com/c40a662038a2d1f54956f4ed19bb5f47

执行后打印的结果

细粒度锁

在前面介绍锁实现以及加锁操作的时候,我们考虑的是如何实现与 WATCH 命令粒度相同的锁,这种锁可以把整个市场都锁住。因为我们是自己动手来构建锁实现,并且我们关心的不是整个市场,而是市场里面的某件商品是否存在,所以我们实际上可以将加锁的粒度变得更细一些。通过只锁住被买卖的商品而不是整个市场,可以减少锁竞争出现的几率并提升程序的性能。

表6-3展示了使用只对单个商品进行加锁的锁实现之后,进行与表6-2所示相同的模拟时的结果。

表 6-3 中的模拟结果显示,在使用细粒度锁的情况下,无论有多少个上架进程和买入进程在运行,程序总能在 60 秒内完成 220 000~230 000 次的上架和买入操作,并且不会引发任何重试操作。

除此之外,买入操作的延迟时间即使在高负载情况下也不会超过3毫秒。在使用细粒度锁的时候买卖操作的执行次数比率跟买家数量和卖家数量之间的比率基本一致,这和使用粗粒度锁时的情况非常相似。

更关键的是,锁可以有效地避免 WATCH 实现因为买入操作竞争过多而导致延迟剧增甚至无法执行的问题。

只需简单的把上面 "market" 改成商品的名称就行了

String locked = acquireLock(conn, "itemX", 10000);

带有超时的锁

前面提到过,目前的锁实现在持有者崩溃的时候不会自动被释放,这将导致锁一直处于已被获取的状态。为了解决这个问题,在这一节中, 我们将为锁加上超时功能。

为了给锁加上超时限制特性,程序将在取得锁之后,调用 EXPIRE 命令来为锁设置过期时间,使得 Redis 可以自动删除超时的锁。为了确保锁在客户端已经崩溃(客户端在执行介于 SETNX 和 EXPIRE 之间的时候崩溃是最糟糕的)的情况下仍然能够自动被释放,客户端会在尝试获取锁失败之后,检查锁的超时时间,并为未设置超时时间的锁设置超时时间。

因此锁总会带有超时时间,并最终因为超时而自动被释放,使得其他客户端可以继续尝试获取已被释放的锁。

需要注意的一点是, 因为多个客户端在 同一时间内设置的超时时间基本上都是相同的,所以即使有多个客户端同时为同一个锁设置超时时间,锁的超时时间也不会产生太大变化。

/**
* 带超时的锁
* @param conn Redis 连接
* @param lockName 要锁的对象名称
* @param acquireTimeout 等待获取锁的时间
* @param lockTimeout 超时时间(秒为单位)
* @return 返回 null 表示获取锁失败,否则返回生成 UUID 标识
*/
public String acquireLockWithTimeout(Jedis conn, String lockName,
long acquireTimeout, long lockTimeout) {
String identifier = UUID.randomUUID().toString();
String lockKey = "lock:" + lockName;
int lockExpire = (int) (lockTimeout / 1000);

long end = System.currentTimeMillis() + acquireTimeout;
while (System.currentTimeMillis() < end) {
// 如果成功获取锁
if (conn.setnx(lockKey, identifier) == 1) {
conn.expire(lockKey, lockExpire);
return identifier;
}
// 获取锁失败(先调用 ttl 检查过期时间,并在有需要时对其进行更新)
// ttl 方法以毫秒为单位,返回 key 的剩余生存时间
// 当 key 不存在时,返回 -2 。 当 key 存在但没有设置剩余生存时间时,返回 -1
// 注意:在 Redis 2.8 以前,当 key 不存在,或者 key 没有设置剩余生存时间时,命令都返回 -1 。
if (conn.ttl(lockKey) == -1) {
conn.expire(lockKey, lockExpire);
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
// null indicates that the lock was not acquired
return null;
}